package com.mimo.logic.session.service.impl;

import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import com.mimo.common.comet.dto.ZoneDTO;
import com.mimo.common.configuration.mrlock.annotation.DistributedRedisLock;
import com.mimo.common.utils.JsonUtils;
import com.mimo.logic.room.service.IRoomService;
import com.mimo.logic.session.constants.SessionKeys;
import com.mimo.logic.session.model.SessionInfo;
import com.mimo.logic.session.service.ISessionService;

@Service
public class SessionServiceImpl implements ISessionService {

  @Autowired
  private RedisTemplate<String, String> redisTemplate;

  @Autowired
  private IRoomService roomService;

  @Autowired
  private ISessionService selfService;

  /**
   * 心跳的维护就很随意,由业务层定时扫描或者由业务上主动做退出清除
   */
  @Override
  public boolean touch(String uid, ZoneDTO zone) {
    if (Objects.nonNull(zone)
        && !redisTemplate.<String, String> opsForHash().hasKey(SessionKeys.LOGIC_SESSION_DETAILS, uid)) {
      SessionInfo session = new SessionInfo();
      session.setUid(uid);
      session.setZone(zone);
      session.setCreatedDate(new Date());
      session.setLastModifiedDate(new Date());
      selfService.upsert(session);
    } else {
      redisTemplate.opsForZSet().add(SessionKeys.LOGIC_SESSION_BEAT, uid, System.currentTimeMillis());
    }
    return true;
  }

  @Override
  @DistributedRedisLock(key = SessionKeys.LOGIC_SESSION_LOCKER_PREFIX + "#{#session.uid}", expired = 60, waited = 60)
  public SessionInfo upsert(SessionInfo session) {
    double timestamp = System.currentTimeMillis();
    redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
      connection.zAdd(SessionKeys.LOGIC_SESSION_BEAT.getBytes(), timestamp, session.getUid().getBytes());
      connection.hSet(SessionKeys.LOGIC_SESSION_DETAILS.getBytes(), session.getUid().getBytes(),
          JsonUtils.toJsonBytes(session));
      return null;
    });
    return session;
  }

  @Override
  @DistributedRedisLock(key = SessionKeys.LOGIC_SESSION_LOCKER_PREFIX + "#{#uid}", expired = 60, waited = 60)
  public Optional<SessionInfo> delete(String uid) {
    Optional<SessionInfo> ret = Optional.empty();

    // 用户会话消除时, 自动调用离群接口
    roomService.listJointRooms(uid).forEach(rid -> roomService.leave(rid, uid));

    // 清洗会话相关数据
    List<Object> objs = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
      connection.hGet(SessionKeys.LOGIC_SESSION_DETAILS.getBytes(), uid.getBytes());

      // 数据清洗
      connection.zRem(SessionKeys.LOGIC_SESSION_BEAT.getBytes(), uid.getBytes());
      connection.hDel(SessionKeys.LOGIC_SESSION_DETAILS.getBytes(), uid.getBytes());
      return null;
    });

    if (Objects.nonNull(objs.get(0))) {
      String json = String.class.cast(objs.get(0));
      if (StringUtils.hasText(json)) {
        ret = Optional.of(JsonUtils.parseJson(json, SessionInfo.class));
      }
    }

    return ret;
  }

  @Override
  public Optional<SessionInfo> findSessionByUser(String uid) {
    String json = redisTemplate.<String, String> opsForHash().get(SessionKeys.LOGIC_SESSION_DETAILS, uid);
    if (StringUtils.hasText(json)) {
      return Optional.of(JsonUtils.parseJson(json, SessionInfo.class));
    }

    return Optional.empty();
  }

  @Override
  public Collection<String> getByLastAccessTimeBetween(long start, long end, long count) {
    Assert.isTrue(end >= start, "End 必须 >= start");
    Assert.isTrue(count > 0, "count 必须大于0");
    return redisTemplate.opsForZSet().rangeByScore(SessionKeys.LOGIC_SESSION_BEAT, start, end, 0, count);
  }

  @Override
  public Optional<Long> getLastAccessTimeByUid(String uid) {
    Objects.requireNonNull(uid);
    return Optional.ofNullable(redisTemplate.opsForZSet().rank(SessionKeys.LOGIC_SESSION_BEAT, uid));
  }

  @Override
  public Collection<SessionInfo> listAll() {
    return redisTemplate.<String, String> opsForHash().entries(SessionKeys.LOGIC_SESSION_DETAILS).values().stream()
        .map(obj -> JsonUtils.parseJson(obj, SessionInfo.class)).collect(Collectors.toList());
  }

}
